package com.uptown.matrix.stream.builder;

import com.alibaba.fastjson.JSONObject;
import com.uptown.matrix.common.task.KafkaReaderMeta;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

public class kafkaTaskBuilder {

    public static KafkaSource<String> buildKafkaSource(KafkaReaderMeta kafkaReaderMeta) {

        return KafkaSource.<String>builder()
                .setBootstrapServers(kafkaReaderMeta.getBootServer())
                .setTopics(kafkaReaderMeta.getTopic())
                .setGroupId(kafkaReaderMeta.getGroupId())
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
    }
}
